import io.openmessaging.KeyValue;
import io.openmessaging.Message;
import io.openmessaging.PullConsumer;
import io.openmessaging.demo.DefaultKeyValue;
import io.openmessaging.demo.DefaultPullConsumer;

import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Author :  Rocky
 * Date : 20/05/2017 15:25
 * Description :
 * Test :
 */
public class ReadMsgTest {

    public static void main(String[] args) throws Exception {
        String path = "";
        if (args == null || args.length <= 0 || args[0] == null) {
            path = "/Users/zuji/Documents/project/open-messaging-demo";
        } else {
            path = args[0];
        }
        KeyValue kvs = new DefaultKeyValue();
        kvs.put("STORE_PATH", path);

        Set<String> queueOrTopics = new HashSet<>(20);

        List<Set<String>> list = new ArrayList<>(20);
        for (int i = 0; i < 100; i++) {
//            queueOrTopics.add("queue" + i);
            queueOrTopics.add("topic" + i);

            if (queueOrTopics.size() % 10 == 0) {
                list.add(queueOrTopics);
                queueOrTopics = new HashSet<>(20);
            }
        }

        AtomicInteger i = new AtomicInteger(0);
        CountDownLatch latch = new CountDownLatch(list.size());

        long s = System.currentTimeMillis();
        for (Set<String> queueOrTopic : list) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    PullConsumer consumer = new DefaultPullConsumer(kvs);
                    consumer.attachQueue(UUID.randomUUID().toString(), queueOrTopic);
                    Message msg;
                    while ((msg = consumer.poll()) != null) {
                        i.incrementAndGet();
                    }
                    latch.countDown();
                }
            }).start();
        }
        latch.await();

        long e = System.currentTimeMillis();
        System.out.println((e - s) + " ms");
        System.out.println(i.get());
    }

}